iT邦幫忙

2025 iThome 鐵人賽

DAY 29
0
Rust

Rust 實戰專案集:30 個漸進式專案從工具到服務系列 第 29

檔案同步服務 - 類似 Dropbox 的檔案同步系統

  • 分享至 

  • xImage
  •  

前言

目前實作到 29 天了,做點不太一樣的東西,製作檔案同步系統
這個系統包含客戶端和伺服器端,能夠自動偵測檔案變更並即時同步到雲端,同時支援多個客戶端之間的檔案同步

學習目標

  • 📁 即時檔案監控與變更偵測
  • 🔄 雙向同步機制
  • 🔐 檔案雜湊驗證與增量同步
  • 🌐 WebSocket 即時通訊
  • 💾 檔案版本管理
  • 🚀 並發處理與效能優化

依賴

cargo.toml

[package]
name = "file-sync-service"
version = "0.1.0"
edition = "2021"

[dependencies]
tokio = { version = "1.35", features = ["full"] }
axum = "0.7"
tower = "0.4"
tower-http = { version = "0.5", features = ["fs", "cors"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
notify = "6.1"
sha2 = "0.10"
hex = "0.4"
chrono = { version = "0.4", features = ["serde"] }
uuid = { version = "1.6", features = ["v4", "serde"] }
futures = "0.3"
tokio-tungstenite = "0.21"
dashmap = "5.5"
walkdir = "2.4"
anyhow = "1.0"
tracing = "0.1"
tracing-subscriber = "0.3"

[dev-dependencies]
tempfile = "3.8"

開始實作

資料結構

use serde::{Deserialize, Serialize};
use chrono::{DateTime, Utc};
use std::path::PathBuf;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FileMetadata {
    pub id: String,
    pub path: PathBuf,
    pub hash: String,
    pub size: u64,
    pub modified: DateTime<Utc>,
    pub version: u32,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum SyncEvent {
    FileCreated { metadata: FileMetadata },
    FileModified { metadata: FileMetadata },
    FileDeleted { path: PathBuf },
    DirectoryCreated { path: PathBuf },
    DirectoryDeleted { path: PathBuf },
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SyncMessage {
    pub event: SyncEvent,
    pub timestamp: DateTime<Utc>,
    pub client_id: String,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SyncState {
    pub files: Vec<FileMetadata>,
    pub last_sync: DateTime<Utc>,
}

檔案雜湊工程

use sha2::{Sha256, Digest};
use std::fs::File;
use std::io::{self, Read};
use std::path::Path;

pub fn calculate_file_hash(path: &Path) -> io::Result<String> {
    let mut file = File::open(path)?;
    let mut hasher = Sha256::new();
    let mut buffer = [0u8; 8192];

    loop {
        let n = file.read(&mut buffer)?;
        if n == 0 {
            break;
        }
        hasher.update(&buffer[..n]);
    }

    Ok(hex::encode(hasher.finalize()))
}

pub fn create_file_metadata(path: &Path) -> io::Result<FileMetadata> {
    let metadata = std::fs::metadata(path)?;
    let hash = calculate_file_hash(path)?;
    
    Ok(FileMetadata {
        id: uuid::Uuid::new_v4().to_string(),
        path: path.to_path_buf(),
        hash,
        size: metadata.len(),
        modified: metadata.modified()?.into(),
        version: 1,
    })
}

檔案監控系統

有監控才有同步

use notify::{
    Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher,
};
use tokio::sync::mpsc;
use std::path::Path;

pub struct FileWatcher {
    watcher: RecommendedWatcher,
    event_tx: mpsc::Sender<SyncEvent>,
}

impl FileWatcher {
    pub fn new(event_tx: mpsc::Sender<SyncEvent>) -> anyhow::Result<Self> {
        let tx = event_tx.clone();
        
        let watcher = RecommendedWatcher::new(
            move |result: Result<Event, notify::Error>| {
                if let Ok(event) = result {
                    let _ = tx.blocking_send(Self::process_event(event));
                }
            },
            Config::default(),
        )?;

        Ok(Self {
            watcher,
            event_tx,
        })
    }

    pub fn watch(&mut self, path: &Path) -> anyhow::Result<()> {
        self.watcher.watch(path, RecursiveMode::Recursive)?;
        Ok(())
    }

    fn process_event(event: Event) -> SyncEvent {
        match event.kind {
            EventKind::Create(_) => {
                if let Some(path) = event.paths.first() {
                    if path.is_file() {
                        if let Ok(metadata) = create_file_metadata(path) {
                            return SyncEvent::FileCreated { metadata };
                        }
                    } else {
                        return SyncEvent::DirectoryCreated {
                            path: path.clone(),
                        };
                    }
                }
            }
            EventKind::Modify(_) => {
                if let Some(path) = event.paths.first() {
                    if path.is_file() {
                        if let Ok(metadata) = create_file_metadata(path) {
                            return SyncEvent::FileModified { metadata };
                        }
                    }
                }
            }
            EventKind::Remove(_) => {
                if let Some(path) = event.paths.first() {
                    return SyncEvent::FileDeleted {
                        path: path.clone(),
                    };
                }
            }
            _ => {}
        }
        
        // 預設事件
        SyncEvent::FileDeleted {
            path: PathBuf::new(),
        }
    }
}

同步 server

use axum::{
    extract::{
        ws::{Message, WebSocket, WebSocketUpgrade},
        State,
    },
    response::IntoResponse,
    routing::get,
    Router,
};
use dashmap::DashMap;
use std::sync::Arc;
use tokio::sync::broadcast;

pub struct SyncServer {
    clients: Arc<DashMap<String, broadcast::Sender<SyncMessage>>>,
    file_store: Arc<DashMap<String, FileMetadata>>,
}

impl SyncServer {
    pub fn new() -> Self {
        Self {
            clients: Arc::new(DashMap::new()),
            file_store: Arc::new(DashMap::new()),
        }
    }

    pub fn router(self) -> Router {
        let state = Arc::new(self);
        
        Router::new()
            .route("/ws", get(ws_handler))
            .route("/files", get(list_files))
            .route("/sync-state", get(get_sync_state))
            .with_state(state)
    }

    pub async fn broadcast_event(&self, message: SyncMessage) {
        for client in self.clients.iter() {
            let _ = client.value().send(message.clone());
        }
    }

    pub fn update_file(&self, metadata: FileMetadata) {
        self.file_store.insert(metadata.id.clone(), metadata);
    }

    pub fn remove_file(&self, id: &str) {
        self.file_store.remove(id);
    }
}

async fn ws_handler(
    ws: WebSocketUpgrade,
    State(server): State<Arc<SyncServer>>,
) -> impl IntoResponse {
    ws.on_upgrade(|socket| handle_socket(socket, server))
}

async fn handle_socket(socket: WebSocket, server: Arc<SyncServer>) {
    let client_id = uuid::Uuid::new_v4().to_string();
    let (tx, mut rx) = broadcast::channel(100);
    
    server.clients.insert(client_id.clone(), tx);
    
    let (mut sender, mut receiver) = socket.split();

    // 接收客戶端訊息
    let server_clone = server.clone();
    let receive_task = tokio::spawn(async move {
        while let Some(Ok(msg)) = receiver.next().await {
            if let Message::Text(text) = msg {
                if let Ok(sync_msg) = serde_json::from_str::<SyncMessage>(&text) {
                    match sync_msg.event {
                        SyncEvent::FileCreated { ref metadata } |
                        SyncEvent::FileModified { ref metadata } => {
                            server_clone.update_file(metadata.clone());
                        }
                        SyncEvent::FileDeleted { .. } => {
                            // 處理檔案刪除
                        }
                        _ => {}
                    }
                    
                    server_clone.broadcast_event(sync_msg).await;
                }
            }
        }
    });

    // 發送訊息給客戶端
    let send_task = tokio::spawn(async move {
        while let Ok(msg) = rx.recv().await {
            if let Ok(json) = serde_json::to_string(&msg) {
                if sender.send(Message::Text(json)).await.is_err() {
                    break;
                }
            }
        }
    });

    tokio::select! {
        _ = receive_task => {},
        _ = send_task => {},
    }

    server.clients.remove(&client_id);
}

async fn list_files(
    State(server): State<Arc<SyncServer>>,
) -> impl IntoResponse {
    let files: Vec<FileMetadata> = server
        .file_store
        .iter()
        .map(|entry| entry.value().clone())
        .collect();
    
    axum::Json(files)
}

async fn get_sync_state(
    State(server): State<Arc<SyncServer>>,
) -> impl IntoResponse {
    let state = SyncState {
        files: server
            .file_store
            .iter()
            .map(|entry| entry.value().clone())
            .collect(),
        last_sync: Utc::now(),
    };
    
    axum::Json(state)
}

同步 client

use tokio_tungstenite::{connect_async, tungstenite::Message};
use futures::{SinkExt, StreamExt};

pub struct SyncClient {
    server_url: String,
    sync_dir: PathBuf,
    client_id: String,
}

impl SyncClient {
    pub fn new(server_url: String, sync_dir: PathBuf) -> Self {
        Self {
            server_url,
            sync_dir,
            client_id: uuid::Uuid::new_v4().to_string(),
        }
    }

    pub async fn start(&self) -> anyhow::Result<()> {
        let (ws_stream, _) = connect_async(&self.server_url).await?;
        let (mut write, mut read) = ws_stream.split();

        // 啟動檔案監控
        let (event_tx, mut event_rx) = mpsc::channel(100);
        let mut watcher = FileWatcher::new(event_tx)?;
        watcher.watch(&self.sync_dir)?;

        // 發送本地事件到伺服器
        let client_id = self.client_id.clone();
        let send_task = tokio::spawn(async move {
            while let Some(event) = event_rx.recv().await {
                let message = SyncMessage {
                    event,
                    timestamp: Utc::now(),
                    client_id: client_id.clone(),
                };
                
                if let Ok(json) = serde_json::to_string(&message) {
                    let _ = write.send(Message::Text(json)).await;
                }
            }
        });

        // 接收伺服器事件並同步到本地
        let sync_dir = self.sync_dir.clone();
        let my_client_id = self.client_id.clone();
        let receive_task = tokio::spawn(async move {
            while let Some(Ok(msg)) = read.next().await {
                if let Message::Text(text) = msg {
                    if let Ok(sync_msg) = serde_json::from_str::<SyncMessage>(&text) {
                        // 忽略自己發送的事件
                        if sync_msg.client_id == my_client_id {
                            continue;
                        }
                        
                        Self::apply_sync_event(&sync_dir, sync_msg.event).await;
                    }
                }
            }
        });

        tokio::select! {
            _ = send_task => {},
            _ = receive_task => {},
        }

        Ok(())
    }

    async fn apply_sync_event(sync_dir: &Path, event: SyncEvent) {
        match event {
            SyncEvent::FileCreated { metadata } |
            SyncEvent::FileModified { metadata } => {
                let local_path = sync_dir.join(&metadata.path);
                // 這裡需要從伺服器下載檔案內容
                println!("需要同步檔案: {:?}", local_path);
            }
            SyncEvent::FileDeleted { path } => {
                let local_path = sync_dir.join(&path);
                if local_path.exists() {
                    let _ = std::fs::remove_file(local_path);
                }
            }
            SyncEvent::DirectoryCreated { path } => {
                let local_path = sync_dir.join(&path);
                let _ = std::fs::create_dir_all(local_path);
            }
            SyncEvent::DirectoryDeleted { path } => {
                let local_path = sync_dir.join(&path);
                let _ = std::fs::remove_dir_all(local_path);
            }
        }
    }

    pub async fn initial_sync(&self) -> anyhow::Result<()> {
        // 獲取伺服器端的檔案狀態
        let client = reqwest::Client::new();
        let response = client
            .get(format!("{}/sync-state", self.server_url))
            .send()
            .await?;
        
        let state: SyncState = response.json().await?;

        // 比對本地檔案並同步
        for remote_file in state.files {
            let local_path = self.sync_dir.join(&remote_file.path);
            
            if !local_path.exists() {
                println!("需要下載: {:?}", remote_file.path);
                // 下載檔案
            } else if let Ok(local_meta) = create_file_metadata(&local_path) {
                if local_meta.hash != remote_file.hash {
                    println!("需要更新: {:?}", remote_file.path);
                    // 更新檔案
                }
            }
        }

        Ok(())
    }
}

主程式

use tracing_subscriber;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    tracing_subscriber::fmt::init();

    let args: Vec<String> = std::env::args().collect();
    
    if args.len() < 2 {
        println!("使用方式:");
        println!("  {} server [port]", args[0]);
        println!("  {} client <server-url> <sync-dir>", args[0]);
        return Ok(());
    }

    match args[1].as_str() {
        "server" => {
            let port = args.get(2)
                .and_then(|p| p.parse().ok())
                .unwrap_or(3000);
            
            start_server(port).await?;
        }
        "client" => {
            if args.len() < 4 {
                println!("請提供伺服器 URL 和同步目錄");
                return Ok(());
            }
            
            let server_url = args[2].clone();
            let sync_dir = PathBuf::from(&args[3]);
            
            start_client(server_url, sync_dir).await?;
        }
        _ => {
            println!("未知命令: {}", args[1]);
        }
    }

    Ok(())
}

async fn start_server(port: u16) -> anyhow::Result<()> {
    let server = SyncServer::new();
    let app = server.router();
    
    let addr = format!("0.0.0.0:{}", port);
    println!("🚀 同步伺服器啟動於 {}", addr);
    
    let listener = tokio::net::TcpListener::bind(&addr).await?;
    axum::serve(listener, app).await?;
    
    Ok(())
}

async fn start_client(server_url: String, sync_dir: PathBuf) -> anyhow::Result<()> {
    if !sync_dir.exists() {
        std::fs::create_dir_all(&sync_dir)?;
    }

    let client = SyncClient::new(server_url, sync_dir);
    
    println!("🔄 執行初始同步...");
    client.initial_sync().await?;
    
    println!("👀 開始監控檔案變更...");
    client.start().await?;
    
    Ok(())
}

這裏寫一些測試

#[cfg(test)]
mod tests {
    use super::*;
    use tempfile::TempDir;
    use std::fs::File;
    use std::io::Write;

    #[test]
    fn test_file_hash() {
        let temp_dir = TempDir::new().unwrap();
        let file_path = temp_dir.path().join("test.txt");
        
        let mut file = File::create(&file_path).unwrap();
        file.write_all(b"Hello, World!").unwrap();
        
        let hash = calculate_file_hash(&file_path).unwrap();
        assert!(!hash.is_empty());
    }

    #[test]
    fn test_file_metadata() {
        let temp_dir = TempDir::new().unwrap();
        let file_path = temp_dir.path().join("test.txt");
        
        let mut file = File::create(&file_path).unwrap();
        file.write_all(b"Test content").unwrap();
        
        let metadata = create_file_metadata(&file_path).unwrap();
        assert_eq!(metadata.size, 12);
        assert!(!metadata.hash.is_empty());
    }

    #[tokio::test]
    async fn test_sync_server() {
        let server = SyncServer::new();
        
        let metadata = FileMetadata {
            id: "test-1".to_string(),
            path: PathBuf::from("test.txt"),
            hash: "abc123".to_string(),
            size: 100,
            modified: Utc::now(),
            version: 1,
        };
        
        server.update_file(metadata.clone());
        assert!(server.file_store.contains_key("test-1"));
        
        server.remove_file("test-1");
        assert!(!server.file_store.contains_key("test-1"));
    }
}

開始使用

啟動 server

cargo run -- server 3000

啟動 client

# 客戶端 1
cargo run -- client ws://localhost:3000/ws ./sync_folder_1

# 客戶端 2
cargo run -- client ws://localhost:3000/ws ./sync_folder_2

測試同步

# 在 sync_folder_1 中建立檔案
echo "Hello" > ./sync_folder_1/test.txt

# 檔案會自動同步到 sync_folder_2
cat ./sync_folder_2/test.txt  # 輸出: Hello

參考


上一篇
個人任務管理 API - 完整的 RESTful 待辦事項服務
下一篇
微服務閘道器 - 實作 API Gateway 與負載均衡 & (後記完賽感言)
系列文
Rust 實戰專案集:30 個漸進式專案從工具到服務30
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言